Reliable Rx
IRPは、Reaqtorなどのストリーミングイベント処理システムで使用されるコアフレームワークライブラリの一部です。IRPの主な役割は、ストリーミング・イベント・プロセッシング・システムの抽象化を定義することであり、サービスの実装によってRaaS(Reactive as a Service)の実現を可能にします。
IRP is part of the core framework libraries used by streaming event processing systems such as Reaqtor. The main role of IRP is to define an abstraction for streaming event processing systems, enabling service implementations to achieve “Reactive as a Service” (RaaS).
コアフレームワークは、IRPの抽象化と実装の構成要素に加えて、式を操作するためのさまざまなユーティリティー(正規化、バインディング、コンパイル、評価を支援する)や、アプリケーションやサービスに組み込むことができるポータブルなイベント処理エンジンの実装も提供します。
In addition to the IRP abstractions and implementation building blocks, the core framework also provides a variety of utilities to manipulate expressions (to aid in normalization, binding, compilation, and evaluation), and implementations of portable event processing engines that can be embedded in applications and services.
Rx代数をベースにしたコアイベント処理エンジンを構築する際、いくつかの重要な特性が出てきました。
When building a core event processing engine based on the Rx algebra, a number of key properties came up:
サービス、アプリケーション、デバイスでのホスティングを可能にする高いポータブル性。
Highly portable to enable hosting in services, applications, and devices.
最小限の依存関係と、I/Oなどの適切な抽象化により、簡単に統合できる。
Minimal dependencies and proper abstractions for I/O etc. for easy integration.
コンピュートノードの故障やスワップアウトが発生した場合のイベント処理の信頼性。
Reliability of event processing in the case of compute node failure or swap out.
クラウド上のコンピュートノードで何百万ものスタンディングクエリをホストするための高い密度。
High density to host millions of standing queries on compute nodes in the cloud.
リアクティブ・コア・エンジンの初期実装は、2013年にBart De SmetとBrian Beckmanの共同作業で開始され、主にコア・エンジンの信頼性向上に注力しました。
The initial implementation efforts for the core reactive engine started in 2013 in a collaboration between Bart De Smet and Brian Beckman where we mainly focused on the reliability aspects of the core engine.
2013年初頭のイベントストリーム処理の状況は、2021年の現在、当たり前のように使われているものとは大きく異なっていたことを指摘しておきましょう。当時の「最先端」のシステムの一つがStormでした。StormとReaqtorのような技術には、設計目標の大きな違いがあり、信頼性に対するアプローチもその一つでした。
It is worth pointing out that the landscape of event stream processing in early 2013 was vastly different from what we take for granted now, in 2021. One of the “state of the art” systems back then was Storm. There were major design goal differences between technologies like Storm versus Reaqtor, and the approach to reliability was one of them.
信頼性を実現するためには、イベントにシーケンス識別子を導入する必要があるとすぐに思い至りました。初期のプロトタイプでは、Rx抽象化の信頼性の高いカウンターパートを使用して、Rx代数にシーケンス識別子を流すというオプションを検討しました。
In order to achieve reliability, we quickly landed on the necessity for introducing sequence identifiers for events. Early prototypes explored the option to flow sequence identifiers through the Rx algebra using a reliable counterpart to the Rx abstractions:
code:c#
interface IReliableObserver<in T>
{
void OnNext(long id, T value);
void OnError(long id, Exception error);
void OnCompleted(long id);
}
interface IReliableObservable<out T>
{
IReliableDisposable Subscribe(IReliableObserver<T> observer);
}
interface IReliableDisposable : IDisposable
{
void Start(long sequenceId);
void Acknowledge(long sequenceId);
}
この試作をきっかけに、いくつかの気付きがありました。
This prototype led to a couple of realizations:
特にWindowやMergeのような高次のシーケンスを持つ複雑な演算子の場合、これらの抽象的な概念に基づいて演算子を合成しても、うまくいかないことがあります。特に、単調に増加するシーケンス識別子を確保するには、オペレーターが使用するロギング機能を想定しなければならず、コアのイベント処理パイプラインにI/O要件が発生します。ベクトルクロックが検討されましたが、状態空間は限定されていません。インターバルツリークロックのような代替案は、状態の爆発を抑えるのに役立ちますが、複雑さが増します。
Composing operators over these abstractions has limited mileage, especially for complex operators such as Window and Merge which exhibit higher order sequences. In particular, ensuring monotonically increasing sequence identifiers isn’t trivial without assumptions on logging capabilities for use by operators, introducing I/O requirements in the core event processing pipeline. Vector clocks were considered but state space isn’t bounded; alternatives such as interval tree clocks can help to reduce state explosion but introduce additional complexity.
イベントの配信と処理の信頼性は、パズルの一部に過ぎません。状態空間を整理するためには、無制限の再生に頼るのではなく、オペレータの状態を定期的に要約する能力が不可欠です。これをサポートするために、オペレータは状態を永続化する手段を提供する必要があり、実行中の計算は、状態の永続化に成功した後に再生バッファを刈り取る方法が必要です。
Reliability of event delivery and processing is only one part of the puzzle; the ability to summarize operator state periodically rather than relying on unbounded replay is essential to prune the state space. To support this, operators need to provide means to persist state and running computations need a way to prune the replay buffer after successfully persisting state.
RxにおけるIDisposableの役割は,実際には,計算の停止とリソースの廃棄を目的として,オペレータグラフを訪問するための手段です.信頼性の高いRxのようなより複雑な実装では,問い合わせ演算子インスタンスのライフサイクルにおいて,追加のビジターパスが必要になります.例えば,フェイルオーバー後に,指定されたシーケンス識別子からの再生を要求して計算を再開したり,演算子の状態の永続化や復元を要求したりすることができます.
The role of IDisposable in Rx is really a means to visit an operator graph for purposes of stopping the computation and disposing resources. For more complex implementations, such as reliable Rx, the lifecycle of query operator instances requires additional visitor passes, for example to restart the computation after a failover by requesting replay from a specified sequence identifier, or to request persistence or restoration of operator state.
これらの観察に基づき、私たちは、コアの演算子ライブラリとホスティング環境の間に明確な境界線を導入するという設計にたどり着きました。問い合わせ式のライフサイクル管理に関する懸念は、以下のようなホスティング環境に委ねられています。
Based on these observations, we landed on a design where we introduced a crisp boundary between the core operator library and the hosting environment. Concerns around managing the lifecycle of a query expression are left to the hosting environment, including:
クエリ式のバインディングとコンパイル
Binding and compilation of query expressions.
ソースからイベントのリプレイを要求する機能
Facilities to request replay of events from sources.
オペレータの状態を保存、回復するための状態の永続化の実装
State persistence implementations to store and recover operator state.
これにより、中核となる問い合わせ演算子ライブラリは、演算子ライブラリとホスティング環境の間のエッジで維持されるシーケンス識別子を扱う必要がなくなりました。言い換えれば、リアクティブなアーティファクトのためのパブリックなインターフェースはパブリックなRxと密接に連携し、ホスティング環境で使用するためのインターフェースには様々なクロスカットの関心事が組み込まれています。
This reliefs the core query operator library from having to deal with sequence identifiers, which are maintained on the edge between the operator library and the hosting environment. In other words, the public interfaces for reactive artifacts were kept closely aligned with public Rx, while the interfaces for use by the hosting environment incorporate various crosscutting concerns.
この関心事の分離は、Rxが仮想時間やアプリケーション時間を扱う際に採用しているアプローチと非常によく似ていることに注意してください。演算子を介して「タイムスタンプ付きのイベント」を流す他のシステムとは異なり、Rxはスケジューラを使用して、計算グラフに流れ込むイベントの順序を確立することができます。これは、前述したLINQ to Traces(Tx)の基礎にもなっています。
Note that this separation of concerns is very similar to the approach taken by Rx to deal with virtual time or application time. Unlikely other systems that flow “events with timestamps” through operators, Rx can use schedulers to establish an order for the events flowing in to a computation graph. This also forms the foundation for LINQ to Traces (Tx), as mentioned before.
最終的には,上述のIReliableインターフェース(若干の違いはありますが)を,エンジン間の通信をサポートするために実装し,問い合わせ評価者のチェックポイント&リプレイ信頼性モデルをサポートしました。受信側の観測可能なシーケンスはシーケンスIDとリプレイをサポートし(上流のノードが提供)、一方、送信側の観測者はシーケンスIDを導入し(下流のノードが使用)、評価者が独立して失敗できるようにしました。イベントが入ってくると、クエリエバリュエータは、生のイベントをコア・オペレータ・ライブラリに渡す前に、シーケンスIDを振って、処理された最新のシーケンスID(high water mark)の追跡を行います。イベントが放出されると、クエリエバリュエータはシーケンスIDを導入し、下流のノードが再生できるように再生バッファを満たします。このアプローチにより、クエリ演算子の構成が計算ノード間の物理的な通信層にまで及ぶようになり、ドット・パイプの等価性が実現されました。
Ultimately, we landed the IReliable interfaces (with some minor differences) shown above to support cross-engine communication, supporting a checkpoint-and-replay reliability model for the query evaluators. Incoming observable sequences support sequence IDs and replay (provided by upstream nodes), while outgoing observers introduce sequence IDs (for use by downstream nodes), allowing evaluators to fail independently. As events come in, the query evaluator shakes off sequence IDs before handing off raw events to the core operator library, keeping tracking of the latest sequence ID processed (a high water mark). As events are emitted, the query evaluator introduces sequence IDs and fills a replay buffer from which downstream nodes can replay. This approach brought composition of query operators to the physical communication layer across compute nodes, thus realizing the dot-pipe equivalence.
IRP は同じ手法を用いて、クエリエンジンの境界で異なる関心事を「振り払う」ことができます。例えば、クエリは式木の形でエンジンに届きますが、最終的にはコンパイルされて評価されます(つまり、QubscriptionのQがSubscriptionのSに変わるというように、式木の表現が振り落とされます)。同様に、クエリはIAsync空間のエンジン境界で終了しますが、計算がローカル評価のためにノードに着地すると、同型の同期空間に移動します(したがって、非同期の性質を振り払うことになります、つまり、AsyncSubscriptionのAsyncを削除してSubscriptionになります)。イメージとしては、太いパイプが箱(クエリエンジン)の境界で細いパイプに変わり、引用、非同期、シーケンス番号などの側面が箱の壁にぶつかり、本質だけが箱の中心部(演算子ライブラリ)に流れていくような感じです。
IRP uses the same trick to “shake off” different concerns at the boundary of a query engine. For example, queries arrive at the engine in expression tree form, but end up being compiled and evaluated (thus shaking off the expression tree representation, i.e. the Q in Qubscription turns into an S in Subscription). Similarly, queries end up at engine boundaries in the IAsync space, but once the computation lands on a node for local evaluation, we move to the isomorphic synchronous space (thus shaking off the async nature, i.e. the Async in AsyncSubscription is dropped to end up with Subscription). A mental image is a fat wide pipe turning into a slim narrow pipe at the boundary of a box (the query engine), where aspects like quotation, asynchrony, sequence numbers, etc. smash into the wall of the box, and only the essence flows through into the heart of the box (the operator library).
哲学的には、IRP空間全体はN次元のハイパーキューブであり、6つの「標準モデル」のリアクティブアーティファクトのためのインターフェイスのファミリーで構成されたポイントがあります。古典的なRxは、このハイパーキューブの中の2つのポイントに過ぎません(IObservable<T>対IQbservable<T>、引用サポートの2つの軸に沿って移動)。他の直交する関心事は、sync 対 async、intrinsic 対 extrinsic identifiers、reliable or non-reliable、そして途中で発見されたいくつかのより難解なものです。
Philosophically, the whole IRP space is a hypercube in N dimensions with points populated by a family of interfaces for the six “Standard Model” reactive artifacts. Classic Rx is just two points in this hypercube (for IObservable<T> versus IQbservable<T>, moving along the binary axis of quotation support). Other orthogonal concerns are sync versus async, intrinsic versus extrinsic identifiers, reliable or non-reliable, and a few more esoteric ones that were discovered along the way.